Laravel Queue——消息队列任务与分发源码剖析

前言

在实际的项目开发中,我们经常会遇到需要轻量级队列的情形,例如发短信、发邮件等,这些任务不足以使用 kafkaRabbitMQ 等重量级的消息队列,但是又的确需要异步、重试、并发控制等功能。通常来说,我们经常会使用 RedisBeanstalkAmazon SQS 来实现相关功能,laravel 为此对不同的后台队列服务提供统一的 API,本文将会介绍应用最为广泛的 redis 队列。

本文参考文档资料:

使用 Laravel Queue 不得不明白的知识

Laravel 的消息队列剖析

背景知识

在讲解 laravel 的队列服务之前,我们要先说说基于 redis 的队列服务。首先,redis设计用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列,

redis 队列的数据结构

  • List 链表

redis 做消息队列的特性例如FIFO(先入先出)很容易实现,只需要一个 list 对象从头取数据,从尾部塞数据即可。

相关的命令:(1)左侧入右侧出:lpush/rpop;(2)右侧入左侧出:rpush/lpop。

这个简单的消息队列很容易实现。

  • Zset 有序集合

有些任务场景,并不需要任务立刻执行,而是需要延迟执行;有些任务很重要,需要在任务失败的时候重新尝试。这些功能仅仅依靠 list 是无法完成的。这个时候,就需要 redis 的有序集合。

Redis 有序集合和 Redis 集合类似,是不包含相同字符串的合集。它们的差别是,每个有序集合的成员都关联着一个评分 score,这个评分用于把有序集合中的成员按最低分到最高分排列。

单看有序集合和延迟任务并无关系,但是可以将有序集合的评分 score 设置为延时任务开启的时间,之后轮询这个有序集合,将到期的任务拿出来进行处理,这样就实现了延迟任务的功能。

对于重要的需要重试的任务,在任务执行之前,会将该任务放入有序集合中,设置任务最长的执行时间。若任务顺利执行完毕,该任务会在有序集合中删除。如果任务没有在规定时间内完成,那么该有序集合的任务将会被重新放入队列中。

相关命令:

(1) ZADD 添加一个或多个成员到有序集合,或者如果它已经存在更新其分数。

(2) ZRANGEBYSCORE 按分数返回一个成员范围的有序集合。

(3) ZREMRANGEBYRANK 在给定的索引之内删除所有成员的有序集合。

laravel 队列服务的任务调度

队列服务的任务调度过程如下:

Laravel Queue——消息队列任务与分发源码剖析 - 图1

laravel 的队列服务由两个进程控制,一个是生产者,一个是消费者。这两个进程操纵了 redis 三个队列,其中一个 List,负责即时任务,两个 Zset,负责延时任务与待处理任务。

生产者负责向 redis 推送任务,如果是即时任务,默认就会向 queue:default 推送;如果是延时任务,就会向 queue:default:delayed 推送。

消费者轮询两个队列,不断的从队列中取出任务,先把任务放入 queue:default:reserved 中,再执行相关任务。如果任务执行成功,就会删除 queue:default:reserved 中的任务,否则会被重新放入 queue:default:delayed 队列中。

laravel 队列服务的总体流程

任务分发流程:

Laravel Queue——消息队列任务与分发源码剖析 - 图6

任务处理器运作:

Laravel Queue——消息队列任务与分发源码剖析 - 图7

laravel 队列服务的注册与启动

laravel 队列服务需要注册的服务比较多:

  1. class QueueServiceProvider extends ServiceProvider
  2. {
  3. public function register()
  4. {
  5. $this->registerManager();
  6. $this->registerConnection();
  7. $this->registerWorker();
  8. $this->registerListener();
  9. $this->registerFailedJobServices();
  10. }
  11. }

registerManager 注册门面

registerManager 负责注册队列服务的门面类:

  1. protected function registerManager()
  2. {
  3. $this->app->singleton('queue', function ($app) {
  4. return tap(new QueueManager($app), function ($manager) {
  5. $this->registerConnectors($manager);
  6. });
  7. });
  8. }
  9. public function registerConnectors($manager)
  10. {
  11. foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) {
  12. $this->{"register{$connector}Connector"}($manager);
  13. }
  14. }
  15. protected function registerRedisConnector($manager)
  16. {
  17. $manager->addConnector('redis', function () {
  18. return new RedisConnector($this->app['redis']);
  19. });
  20. }

QueueManager 是队列服务的总门面,提供一切与队列相关的操作接口。QueueManager 中有一个成员变量 $connectors,该成员变量中存储着所有 laravel 支持的底层队列服务:’Database’, ‘Redis’, ‘Beanstalkd’, ‘Sqs’。

  1. class QueueManager implements FactoryContract, MonitorContract
  2. {
  3. public function addConnector($driver, Closure $resolver)
  4. {
  5. $this->connectors[$driver] = $resolver;
  6. }
  7. }

成员变量 $connectors 会被存储各种驱动的 connector,例如 RedisConnectorSqsConnectorDatabaseConnectorBeanstalkdConnector

registerConnection 底层队列连接服务

接下来,就要连接实现队列的底层服务了,例如 redis

  1. protected function registerConnection()
  2. {
  3. $this->app->singleton('queue.connection', function ($app) {
  4. return $app['queue']->connection();
  5. });
  6. }
  7. public function connection($name = null)
  8. {
  9. $name = $name ?: $this->getDefaultDriver();
  10. if (! isset($this->connections[$name])) {
  11. $this->connections[$name] = $this->resolve($name);
  12. $this->connections[$name]->setContainer($this->app);
  13. }
  14. return $this->connections[$name];
  15. }
  16. public function getDefaultDriver()
  17. {
  18. return $this->app['config']['queue.default'];
  19. }

connection 函数首先会获取 连接 名,没有 连接 名就会从 config 中获取默认的连接。

  1. protected function resolve($name)
  2. {
  3. $config = $this->getConfig($name);
  4. return $this->getConnector($config['driver'])
  5. ->connect($config)
  6. ->setConnectionName($name);
  7. }

resolve 函数利用相应的底层驱动 connector 进行连接操作,也就是 connect 函数,该函数会返回 RedisQueue

  1. class RedisConnector implements ConnectorInterface
  2. {
  3. public function connect(array $config)
  4. {
  5. return new RedisQueue(
  6. $this->redis, $config['queue'],
  7. Arr::get($config, 'connection', $this->connection),
  8. Arr::get($config, 'retry_after', 60)
  9. );
  10. }
  11. }

registerWorker 消费者服务注册

消费者的注册服务会返回 Illuminate\Queue\Worker 类:

  1. protected function registerWorker()
  2. {
  3. $this->app->singleton('queue.worker', function () {
  4. return new Worker(
  5. $this->app['queue'], $this->app['events'], $this->app[ExceptionHandler::class]
  6. );
  7. });
  8. }

laravel Bus 服务注册与启动

定义好自己想要的队列类之后,还需要将队列任务推送给底层驱动后台,例如 redis,一般会使用 dispatch 函数:

  1. Job::dispatch();

或者

  1. $job = (new ProcessPodcast($pocast));
  2. dispatch($job);

dispatch 函数就是 Bus 服务,专门用于分发队列任务。

  1. class BusServiceProvider extends ServiceProvider
  2. {
  3. public function register()
  4. {
  5. $this->app->singleton(Dispatcher::class, function ($app) {
  6. return new Dispatcher($app, function ($connection = null) use ($app) {
  7. return $app[QueueFactoryContract::class]->connection($connection);
  8. });
  9. });
  10. $this->app->alias(
  11. Dispatcher::class, DispatcherContract::class
  12. );
  13. $this->app->alias(
  14. Dispatcher::class, QueueingDispatcherContract::class
  15. );
  16. }
  17. }

创建任务

queue 设置

  1. 'redis' => [
  2. 'driver' => 'redis',
  3. 'connection' => 'default',
  4. 'queue' => 'default',
  5. 'retry_after' => 90,
  6. ],

一般来说,默认的 redis 配置如上,connectiondatabaseredis 的连接名称;queueredis 中的队列名称,值得注意的是,如果使用的是 redis 集群的话,这个需要使用 key hash tag,也就是 {default};当任务运行超过 retry_after 这个时间后,该任务会被重新放入队列当中。

任务类的创建

  • 任务类的结构很简单,一般来说只会包含一个让队列用来调用此任务的 handle 方法。

  • 如果想要使得任务被推送到队列中,而不是同步执行,那么需要实现 Illuminate\Contracts\Queue\ShouldQueue 接口。

  • 如果想要让任务推送到特定的连接中,例如 redis 或者 sqs,那么需要设置 conneciton 变量。

  • 如果想要让任务推送到特定的队列中去,可以设置 queue 变量。

  • 如果想要让任务延迟推送,那么需要设置 delay 变量。

  • 如果想要设置任务至多重试的次数,可以使用 tries 变量;

  • 如果想要设置任务可以运行的最大秒数,那么可以使用 timeout 参数。

  • 如果想要手动访问队列,可以使用 trait : Illuminate\Queue\InteractsWithQueue

  • 如果队列监听器任务执行次数超过在工作队列中定义的最大尝试次数,监听器的 failed 方法将会被自动调用。 failed 方法接受事件实例和失败的异常作为参数:

  1. class ProcessPodcast implements ShouldQueue
  2. {
  3. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  4. protected $podcast;
  5. public $connection = 'redis';
  6. public $queue = 'test';
  7. public $delay = 30;
  8. public $tries = 5;
  9. public $timeout = 30;
  10. public function __construct(Podcast $podcast)
  11. {
  12. $this->podcast = $podcast;
  13. }
  14. public function handle(AudioProcessor $processor)
  15. {
  16. // Process uploaded podcast...
  17. if (false) {
  18. $this->release(30);
  19. }
  20. }
  21. public function failed(OrderShipped $event, $exception)
  22. {
  23. //
  24. }
  25. }

任务事件

  1. class AppServiceProvider extends ServiceProvider
  2. {
  3. public function boot()
  4. {
  5. //任务运行前
  6. Queue::before(function (JobProcessing $event) {
  7. // $event->connectionName
  8. // $event->job
  9. // $event->job->payload()
  10. });
  11. //任务运行后
  12. Queue::after(function (JobProcessed $event) {
  13. // $event->connectionName
  14. // $event->job
  15. // $event->job->payload()
  16. });
  17. //任务循环前
  18. Queue::looping(function () {
  19. while (DB::transactionLevel() > 0) {
  20. DB::rollBack();
  21. }
  22. });
  23. //任务失败后
  24. Queue::failing(function (JobFailed $event) {
  25. // $event->connectionName
  26. // $event->job
  27. // $event->exception
  28. });
  29. //异常发生后
  30. Queue::exceptionOccurred(function (JobFailed $event) {
  31. // $event->connectionName
  32. // $event->job
  33. // $event->exception
  34. });
  35. }
  36. }

任务的分发

分发服务

  • 写好任务类后,就能通过 dispatch 辅助函数来分发它了。唯一需要传递给 dispatch 的参数是这个任务类的实例:
  1. class PodcastController extends Controller
  2. {
  3. public function store(Request $request)
  4. {
  5. // 创建播客...
  6. ProcessPodcast::dispatch($podcast);
  7. }
  8. }
  • 如果想延迟执行一个队列中的任务,可以用任务实例的 delay 方法。
  1. ProcessPodcast::dispatch($podcast)
  2. ->delay(Carbon::now()->addMinutes(10));
  • 通过推送任务到不同的队列,可以给队列任务分类,甚至可以控制给不同的队列分配多少任务。要指定队列的话,就调用任务实例的 onQueue 方法:
  1. ProcessPodcast::dispatch($podcast)->onQueue('processing');
  • 如果使用了多个队列连接,可以将任务推到指定连接。要指定连接的话,可以在分发任务的时候使用 onConnection 方法:
  1. ProcessPodcast::dispatch($podcast)->onConnection('redis
  2. ');

这些链式的函数是在 traitIlluminate\Foundation\Bus\Dispatchable 的基础上应用的,该 traitdispatch 函数启动:

  1. trait Dispatchable
  2. {
  3. public static function dispatch()
  4. {
  5. return new PendingDispatch(new static(...func_get_args()));
  6. }
  7. }

PendingDispatch 类中定义了链式函数,该函数巧妙在析构函数中,析构函数自动调用全局函数 dispatch

  1. class PendingDispatch
  2. {
  3. public function __construct($job)
  4. {
  5. $this->job = $job;
  6. }
  7. public function onConnection($connection)
  8. {
  9. $this->job->onConnection($connection);
  10. return $this;
  11. }
  12. public function onQueue($queue)
  13. {
  14. $this->job->onQueue($queue);
  15. return $this;
  16. }
  17. public function delay($delay)
  18. {
  19. $this->job->delay($delay);
  20. return $this;
  21. }
  22. public function __destruct()
  23. {
  24. dispatch($this->job);
  25. }
  26. }

各个函数里面的 onConnectiondelayonQueue 等函数是任务中的 traitIlluminate\Bus\Queueable

  1. trait Queueable
  2. {
  3. public function onConnection($connection)
  4. {
  5. $this->connection = $connection;
  6. return $this;
  7. }
  8. public function onQueue($queue)
  9. {
  10. $this->queue = $queue;
  11. return $this;
  12. }
  13. public function delay($delay)
  14. {
  15. $this->delay = $delay;
  16. return $this;
  17. }
  18. }

dispatch 任务分发源码

任务的分发离不开 Bus 服务,可以利用全局函数 dispatch,还可以使用 Dispatchable 这个 trait:

  1. class Dispatcher implements QueueingDispatcher
  2. {
  3. public function dispatch($command)
  4. {
  5. if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
  6. return $this->dispatchToQueue($command);
  7. } else {
  8. return $this->dispatchNow($command);
  9. }
  10. }
  11. protected function commandShouldBeQueued($command)
  12. {
  13. return $command instanceof ShouldQueue;
  14. }
  15. }

我们这里主要看异步的任务:

  1. public function dispatchToQueue($command)
  2. {
  3. $connection = isset($command->connection) ? $command->connection : null;
  4. $queue = call_user_func($this->queueResolver, $connection);
  5. if (! $queue instanceof Queue) {
  6. throw new RuntimeException('Queue resolver did not return a Queue implementation.');
  7. }
  8. if (method_exists($command, 'queue')) {
  9. return $command->queue($queue, $command);
  10. } else {
  11. return $this->pushCommandToQueue($queue, $command);
  12. }
  13. }

进行任务分发之前,首先要利用 queueResolver 连接底层驱动。如果任务类中含有 queue 函数,那么就会利用用户自己的 queue 对驱动进行推送任务。否则就会启动默认的程序:

  1. protected function pushCommandToQueue($queue, $command)
  2. {
  3. if (isset($command->queue, $command->delay)) {
  4. return $queue->laterOn($command->queue, $command->delay, $command);
  5. }
  6. if (isset($command->queue)) {
  7. return $queue->pushOn($command->queue, $command);
  8. }
  9. if (isset($command->delay)) {
  10. return $queue->later($command->delay, $command);
  11. }
  12. return $queue->push($command);
  13. }

我们以 redis 为例,queue 这个类就是 Illuminate\Queue\RedisQueue:

  1. class RedisQueue extends Queue implements QueueContract
  2. {
  3. public function push($job, $data = '', $queue = null)
  4. {
  5. return $this->pushRaw($this->createPayload($job, $data), $queue);
  6. }
  7. public function pushOn($queue, $job, $data = '')
  8. {
  9. return $this->push($job, $data, $queue);
  10. }
  11. public function later($delay, $job, $data = '', $queue = null)
  12. {
  13. return $this->laterRaw($delay, $this->createPayload($job, $data), $queue);
  14. }
  15. public function laterOn($queue, $delay, $job, $data = '')
  16. {
  17. return $this->later($delay, $job, $data, $queue);
  18. }
  19. }

我们先看 pushpush 函数调用 pushRaw,在调用之前,要把任务类进行序列化,并且以特定的格式进行 json 序列化:

  1. protected function createPayload($job, $data = '', $queue = null)
  2. {
  3. $payload = json_encode($this->createPayloadArray($job, $data, $queue));
  4. if (JSON_ERROR_NONE !== json_last_error()) {
  5. throw new InvalidPayloadException;
  6. }
  7. return $payload;
  8. }
  9. protected function createPayloadArray($job, $data = '', $queue = null)
  10. {
  11. return is_object($job)
  12. ? $this->createObjectPayload($job)
  13. : $this->createStringPayload($job, $data);
  14. }
  15. protected function createObjectPayload($job)
  16. {
  17. return [
  18. 'job' => 'Illuminate\Queue\CallQueuedHandler@call',
  19. 'maxTries' => isset($job->tries) ? $job->tries : null,
  20. 'timeout' => isset($job->timeout) ? $job->timeout : null,
  21. 'data' => [
  22. 'commandName' => get_class($job),
  23. 'command' => serialize(clone $job),
  24. ],
  25. ];
  26. }
  27. protected function createStringPayload($job, $data)
  28. {
  29. return ['job' => $job, 'data' => $data];
  30. }

格式化数据之后,就会将 json 推送到 redis 队列中,对于非延时的任务,直接调用 rpush 即可:

  1. public function pushRaw($payload, $queue = null, array $options = [])
  2. {
  3. $this->getConnection()->rpush($this->getQueue($queue), $payload);
  4. return Arr::get(json_decode($payload, true), 'id');
  5. }

对于延时的任务,会调用 laterRaw,调用 redis 的有序集合 zadd 函数:

  1. protected function availableAt($delay = 0)
  2. {
  3. return $delay instanceof DateTimeInterface
  4. ? $delay->getTimestamp()
  5. : Carbon::now()->addSeconds($delay)->getTimestamp();
  6. }
  7. protected function laterRaw($delay, $payload, $queue = null)
  8. {
  9. $this->getConnection()->zadd(
  10. $this->getQueue($queue).':delayed', $this->availableAt($delay), $payload
  11. );
  12. return Arr::get(json_decode($payload, true), 'id');
  13. }

这样,相关任务就会被分发到 redis 对应的队列中去。